package com.atguigu.util;

import com.atguigu.common.GmallConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Properties;

/**
 * @author yhm
 * @create 2022-11-16 14:40
 */
public class KafkaUtil {
    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topicName,String groupId){
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, GmallConfig.BOOTSTRAP_SERVERS);
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);

        // 使用自定义的序列号器  避免在接受null的时候  出现bug
        KafkaDeserializationSchema<String> kafkaDeserializationSchema = new KafkaDeserializationSchema<String>() {
            @Override
            public TypeInformation<String> getProducedType() {
                return BasicTypeInfo.STRING_TYPE_INFO;
            }

            @Override
            public boolean isEndOfStream(String nextElement) {
                return false;
            }

            @Override
            public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                if (record ==null || record.value() == null){
                    return "";
                }
                return new String(record.value());
            }
        };
        return new FlinkKafkaConsumer<String>(topicName,kafkaDeserializationSchema, props);
    }

    public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topicName){
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,GmallConfig.BOOTSTRAP_SERVERS);
        return new FlinkKafkaProducer<String>(topicName,new SimpleStringSchema(),props);
    }

    public static String getKafkaDDL(String topicName,String groupId){
        return  "WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = '" + topicName + "',\n" +
                "  'properties.bootstrap.servers' = '" + GmallConfig.BOOTSTRAP_SERVERS + "',\n" +
                "  'properties.group.id' = '" + groupId +"',\n" +
                "  'scan.startup.mode' = 'group-offsets',\n" +
                "  'format' = 'json'\n" +
                ")";
    }

    public static String getKafkaSinkDDL(String topicName){
        return  "WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = '" + topicName + "',\n" +
                "  'properties.bootstrap.servers' = '" + GmallConfig.BOOTSTRAP_SERVERS + "',\n" +
                "  'format' = 'json'\n" +
                ")";
    }
}
